//deno-lint-ignore-file no-explicit-any require-await
/** @format */

import flatten from '../utils/flatten.ts';
import sleep from '../utils/sleep.ts';
import BufferedAsyncIterator from '../utils/bufferedAsyncIterator.ts';
import websiteUrl from '../utils/websiteUrl.ts';
import arrayDiff from '../utils/arrayDiff.ts';
import createRetry from '../retry/index.ts';
import sharedPromiseTo from '../utils/sharedPromiseTo.ts';

import { OffsetManager } from './offsetManager/index.ts';
import Batch from './batch.ts';
import { SeekOffsets } from './seekOffsets.ts';
import { SubscriptionState } from './subscriptionState.ts';
import { events } from './instrumentationEvents.ts';

import { MemberAssignment } from './assignerProtocol.ts';

import { Logger, Assigner, GroupMember, ConsumerGroupOptions } from '../../index.d.ts'

import { Broker } from '../broker/index.ts'
import { Cluster } from '../cluster/index.ts'


import {
  KafkaJSError,
  KafkaJSNonRetriableError,
  KafkaJSStaleTopicMetadataAssignment,
} from '../errors.ts';

const { keys } = Object;
const { GROUP_JOIN, HEARTBEAT, CONNECT, RECEIVED_UNSUBSCRIBED_TOPICS } = events;

const STALE_METADATA_ERRORS = [
  'LEADER_NOT_AVAILABLE',
  // Fetch before v9 uses NOT_LEADER_FOR_PARTITION
  'NOT_LEADER_FOR_PARTITION',
  // Fetch after v9 uses {FENCED,UNKNOWN}_LEADER_EPOCH
  'FENCED_LEADER_EPOCH',
  'UNKNOWN_LEADER_EPOCH',
  'UNKNOWN_TOPIC_OR_PARTITION',
];

const isRebalancing = (e: KafkaJSError) =>
  e.type === 'REBALANCE_IN_PROGRESS' || e.type === 'NOT_COORDINATOR_FOR_GROUP';

const PRIVATE = {
  JOIN: Symbol('private:ConsumerGroup:join') as unknown as string,
  SYNC: Symbol('private:ConsumerGroup:sync') as unknown as string,
  HEARTBEAT: Symbol('private:ConsumerGroup:heartbeat') as unknown as string,
  SHAREDHEARTBEAT: Symbol('private:ConsumerGroup:sharedHeartbeat') as unknown as string
};

export class ConsumerGroup {
  [index: string]: any
  cluster: Cluster;
  groupId: string;
  topics: string[]
  topicsSubscribed: string[]
  topicConfigurations: {[topic: string]: { fromBeginning: boolean }};
  logger: Logger
  instrumentationEmitter: any;
  retrier: any;
  assigners: Assigner[];
  sessionTimeout: number;
  rebalanceTimeout: number;
  maxBytesPerPartition: number;
  minBytes: number;
  maxBytes: number;
  maxWaitTimeInMs: any;
  autoCommit: boolean;
  autoCommitInterval: number | null;
  autoCommitThreshold: number | null;
  isolationLevel: number;
  rackId: string;
  metadataMaxAge: number;
  maxWaitTime: number;
  seekOffset: SeekOffsets;
  coordinator: Broker | null;
  generationId: number | null;
  leaderId: string | null;
  memberId: string | null;
  members: GroupMember[] | null;
  groupProtocol: string | null;
  partitionsPerSubscribedTopic: Map<any, any> | null;
  offsetManager: OffsetManager | null;
  subscriptionState: SubscriptionState | null;
  preferredReadReplicasPerTopicPartition: any;
  lastRequest: number;

  constructor({
    retry,
    cluster,
    groupId,
    topics,
    topicConfigurations,
    logger,
    instrumentationEmitter,
    assigners,
    sessionTimeout,
    rebalanceTimeout,
    maxBytesPerPartition,
    minBytes,
    maxBytes,
    maxWaitTimeInMs,
    autoCommit,
    autoCommitInterval,
    autoCommitThreshold,
    isolationLevel,
    rackId,
    metadataMaxAge,
  }: ConsumerGroupOptions) {
    /** @type {import("../../types").Cluster} */
    this.cluster = cluster as Cluster;
    this.groupId = groupId;
    this.topics = topics;
    this.topicsSubscribed = topics;
    this.topicConfigurations = topicConfigurations;
    this.logger = logger.namespace('ConsumerGroup');
    this.instrumentationEmitter = instrumentationEmitter;
    this.retrier = createRetry(Object.assign({}, retry));
    this.assigners = assigners;
    this.sessionTimeout = sessionTimeout;
    this.rebalanceTimeout = rebalanceTimeout;
    this.maxBytesPerPartition = maxBytesPerPartition;
    this.minBytes = minBytes;
    this.maxBytes = maxBytes;
    this.maxWaitTime = maxWaitTimeInMs;
    this.autoCommit = autoCommit;
    this.autoCommitInterval = autoCommitInterval;
    this.autoCommitThreshold = autoCommitThreshold;
    this.isolationLevel = isolationLevel;
    this.rackId = rackId;
    this.metadataMaxAge = metadataMaxAge;
    this.seekOffset = new SeekOffsets();
    this.coordinator = null;
    this.generationId = null;
    this.leaderId = null;
    this.memberId = null;
    this.members = null;
    this.groupProtocol = null;

    this.partitionsPerSubscribedTopic = null;
    /**
     * Preferred read replica per topic and partition
     *
     * Each of the partitions tracks the preferred read replica (`nodeId`) and a timestamp
     * until when that preference is valid.
     *
     * @type {{[topicName: string]: {[partition: number]: {nodeId: number, expireAt: number}}}}
     */
    this.preferredReadReplicasPerTopicPartition = {};
    this.offsetManager = null;
    this.subscriptionState = new SubscriptionState();

    this.lastRequest = Date.now();

    this[PRIVATE.SHAREDHEARTBEAT] = sharedPromiseTo(
      async ({ interval }: any) => {
        const { groupId, generationId, memberId } = this;
        const now = Date.now();

        if (memberId && now >= this.lastRequest + interval) {
          const payload = {
            groupId,
            memberId,
            groupGenerationId: generationId,
          };
          //@ts-ignore undefined ok
          await this.coordinator.heartbeat(payload);
          this.instrumentationEmitter.emit(HEARTBEAT, payload);
          this.lastRequest = Date.now();
        }
      }
    );
  }

  isLeader() {
    return this.leaderId && this.memberId === this.leaderId;
  }

  async connect() {
    await this.cluster.connect();
    this.instrumentationEmitter.emit(CONNECT);
    await this.cluster.refreshMetadataIfNecessary();
  }

  async [PRIVATE.JOIN]() {
    const { groupId, sessionTimeout, rebalanceTimeout } = this;
    //@ts-ignore this will return broker
    this.coordinator = await this.cluster.findGroupCoordinator({ groupId });
    //@ts-ignore - protocoltype will be provided by default
    const groupData = await this.coordinator.joinGroup({ 
      groupId,
      sessionTimeout,
      rebalanceTimeout,
      memberId: this.memberId || '',
      groupProtocols: this.assigners.map((assigner: any) =>
        assigner.protocol({
          topics: this.topicsSubscribed,
        })
      ),
    });

    this.generationId = groupData.generationId;
    this.leaderId = groupData.leaderId;
    this.memberId = groupData.memberId;
    this.members = groupData.members;
    this.groupProtocol = groupData.groupProtocol;
  }

  async leave() {
    const { groupId, memberId } = this;
    if (memberId) {
      await this.coordinator!.leaveGroup({ groupId, memberId });
      this.memberId = null;
    }
  }

  async [PRIVATE.SYNC]() {
    let assignment:any[] = [];
    const {
      groupId,
      generationId,
      memberId,
      members,
      groupProtocol,
      topics,
      topicsSubscribed,
      coordinator,
    } = this;

    if (this.isLeader()) {
      this.logger.debug('Chosen as group leader', {
        groupId,
        generationId,
        memberId,
        topics,
      });
      const assigner = this.assigners.find(
        ({ name }: any) => name === groupProtocol
      );

      if (!assigner) {
        throw new KafkaJSNonRetriableError(
          `Unsupported partition assigner "${groupProtocol}", the assigner wasn't found in the assigners list`
        );
      }

      await this.cluster.refreshMetadata();
      //@ts-ignore null ok for members
      assignment = await assigner.assign({ members, topics: topicsSubscribed });

      this.logger.debug('Group assignment', {
        groupId,
        generationId,
        groupProtocol,
        assignment,
        topics: topicsSubscribed,
      });
    }

    // Keep track of the partitions for the subscribed topics
    this.partitionsPerSubscribedTopic =
      this.generatePartitionsPerSubscribedTopic();
    
    const { memberAssignment } = await this.coordinator!.syncGroup({
      groupId,
      //@ts-ignore null ok for params
      generationId,
      //@ts-ignore null ok for params
      memberId,
      groupAssignment: assignment,
    });

    const decodedMemberAssignment = MemberAssignment.decode(memberAssignment);
    const decodedAssignment =
      decodedMemberAssignment != null ? decodedMemberAssignment.assignment : {};

    this.logger.debug('Received assignment', {
      groupId,
      generationId,
      memberId,
      memberAssignment: decodedAssignment,
    });

    const assignedTopics = keys(decodedAssignment);
    const topicsNotSubscribed = arrayDiff(assignedTopics, topicsSubscribed);

    if (topicsNotSubscribed.length > 0) {
      const payload = {
        groupId,
        generationId,
        memberId,
        assignedTopics,
        topicsSubscribed,
        topicsNotSubscribed,
      };

      this.instrumentationEmitter.emit(RECEIVED_UNSUBSCRIBED_TOPICS, payload);
      this.logger.warn('Consumer group received unsubscribed topics', {
        ...payload,
        helpUrl: websiteUrl(
          'docs/faq',
          'why-am-i-receiving-messages-for-topics-i-m-not-subscribed-to'
        ),
      });
    }

    // Remove unsubscribed topics from the list
    const safeAssignment = arrayDiff(assignedTopics, topicsNotSubscribed);
    const currentMemberAssignment = safeAssignment.map((topic: any) => ({
      topic,
      partitions: decodedAssignment[topic],
    }));

    // Check if the consumer is aware of all assigned partitions
    for (const assignment of currentMemberAssignment) {
      const { topic, partitions: assignedPartitions } = assignment;
      const knownPartitions = this.partitionsPerSubscribedTopic.get(topic);
      const isAwareOfAllAssignedPartitions = assignedPartitions.every(
        (partition: any) => knownPartitions.includes(partition)
      );

      if (!isAwareOfAllAssignedPartitions) {
        this.logger.warn(
          'Consumer is not aware of all assigned partitions, refreshing metadata',
          {
            groupId,
            generationId,
            memberId,
            topic,
            knownPartitions,
            assignedPartitions,
          }
        );

        // If the consumer is not aware of all assigned partitions, refresh metadata
        // and update the list of partitions per subscribed topic. It's enough to perform
        // this operation once since refresh metadata will update metadata for all topics
        await this.cluster.refreshMetadata();
        this.partitionsPerSubscribedTopic =
          this.generatePartitionsPerSubscribedTopic();
        break;
      }
    }

    this.topics = currentMemberAssignment.map(({ topic }: any) => topic);
    this.subscriptionState!.assign(currentMemberAssignment);
    this.offsetManager = new OffsetManager({
      cluster: this.cluster,
      topicConfigurations: this.topicConfigurations,
      instrumentationEmitter: this.instrumentationEmitter,
      memberAssignment: currentMemberAssignment.reduce(
        (partitionsByTopic: any, { topic, partitions }: any) => ({
          ...partitionsByTopic,
          [topic]: partitions,
        }),
        {}
      ),
      autoCommit: this.autoCommit,
      autoCommitInterval: this.autoCommitInterval,
      autoCommitThreshold: this.autoCommitThreshold,
      coordinator,
      groupId,
      generationId,
      memberId,
    });
  }

  joinAndSync() {
    const startJoin = Date.now();
    return this.retrier(async (bail: any) => {
      try {
        await this[PRIVATE.JOIN]();
        await this[PRIVATE.SYNC]();

        const memberAssignment = this.assigned().reduce(
          (result: any, { topic, partitions }: any) => ({
            ...result,
            [topic]: partitions,
          }),
          {}
        );

        const payload = {
          groupId: this.groupId,
          memberId: this.memberId,
          leaderId: this.leaderId,
          isLeader: this.isLeader(),
          memberAssignment,
          groupProtocol: this.groupProtocol,
          duration: Date.now() - startJoin,
        };

        this.instrumentationEmitter.emit(GROUP_JOIN, payload);
        this.logger.info('Consumer has joined the group', payload);
      } catch (e) {
        if (isRebalancing(e)) {
          // Rebalance in progress isn't a retriable protocol error since the consumer
          // has to go through find coordinator and join again before it can
          // actually retry the operation. We wrap the original error in a retriable error
          // here instead in order to restart the join + sync sequence using the retrier.
          throw new KafkaJSError(e);
        }

        bail(e);
      }
    });
  }

  /**
   * @param {import("../../types").TopicPartition} topicPartition
   */
  resetOffset({ topic, partition }: any) {
    this.offsetManager!.resetOffset({ topic, partition });
  }

  /**
   * @param {import("../../types").TopicPartitionOffset} topicPartitionOffset
   */
  resolveOffset({ topic, partition, offset }: any) {
    this.offsetManager!.resolveOffset({ topic, partition, offset });
  }

  /**
   * Update the consumer offset for the given topic/partition. This will be used
   * on the next fetch. If this API is invoked for the same topic/partition more
   * than once, the latest offset will be used on the next fetch.
   *
   * @param {import("../../types").TopicPartitionOffset} topicPartitionOffset
   */
  seek({ topic, partition, offset }: any) {
    this.seekOffset.set(topic, partition, offset);
  }

  pause(topicPartitions: any) {
    this.logger.info(`Pausing fetching from ${topicPartitions.length} topics`, {
      topicPartitions,
    });
    this.subscriptionState!.pause(topicPartitions);
  }

  resume(topicPartitions: any) {
    this.logger.info(
      `Resuming fetching from ${topicPartitions.length} topics`,
      {
        topicPartitions,
      }
    );
    this.subscriptionState!.resume(topicPartitions);
  }

  assigned() {
    return this.subscriptionState!.assigned();
  }

  paused() {
    return this.subscriptionState!.paused();
  }

  async commitOffsetsIfNecessary() {
    await this.offsetManager!.commitOffsetsIfNecessary();
  }

  async commitOffsets(offsets: any) {
    await this.offsetManager!.commitOffsets(offsets);
  }

  uncommittedOffsets() {
    return this.offsetManager!.uncommittedOffsets();
  }

  async heartbeat({ interval }: any) {
    return this[PRIVATE.SHAREDHEARTBEAT]({ interval });
  }

  async fetch() {
    try {
      const { topics, maxBytesPerPartition, maxWaitTime, minBytes, maxBytes } =
        this;
      /** @type {{[nodeId: string]: {topic: string, partitions: { partition: number; fetchOffset: string; maxBytes: number }[]}[]}} */
      const requestsPerNode: any = {};

      await this.cluster.refreshMetadataIfNecessary();
      this.checkForStaleAssignment();

      while (this.seekOffset.size > 0) {
        const seekEntry = this.seekOffset.pop();
        this.logger.debug('Seek offset', {
          groupId: this.groupId,
          memberId: this.memberId,
          seek: seekEntry,
        });
        await this.offsetManager!.seek(seekEntry);
      }

      const pausedTopicPartitions = this.subscriptionState!.paused();
      const activeTopicPartitions = this.subscriptionState!.active();

      const activePartitions = flatten(
        activeTopicPartitions.map(({ partitions }: any) => partitions)
      );
      const activeTopics = activeTopicPartitions
        .filter(({ partitions }: any) => partitions.length > 0)
        .map(({ topic }: any) => topic);

      if (activePartitions.length === 0) {
        this.logger.debug(
          `No active topic partitions, sleeping for ${this.maxWaitTime}ms`,
          {
            topics,
            activeTopicPartitions,
            pausedTopicPartitions,
          }
        );

        await sleep(this.maxWaitTime);
        return BufferedAsyncIterator([]);
      }

      await this.offsetManager!.resolveOffsets();

      this.logger.debug(
        `Fetching from ${activePartitions.length} partitions for ${activeTopics.length} out of ${topics.length} topics`,
        {
          topics,
          activeTopicPartitions,
          pausedTopicPartitions,
        }
      );

      for (const topicPartition of activeTopicPartitions) {
        const partitionsPerNode = this.findReadReplicaForPartitions(
          topicPartition.topic,
          topicPartition.partitions
        );

        const nodeIds = keys(partitionsPerNode);
        const committedOffsets = this.offsetManager!.committedOffsets();

        for (const nodeId of nodeIds) {
          const partitions = partitionsPerNode[nodeId]
            .filter((partition: any) => {
              /**
               * When recovering from OffsetOutOfRange, each partition can recover
               * concurrently, which invalidates resolved and committed offsets as part
               * of the recovery mechanism (see OffsetManager.clearOffsets). In concurrent
               * scenarios this can initiate a new fetch with invalid offsets.
               *
               * This was further highlighted by https://github.com/tulios/kafkajs/pull/570,
               * which increased concurrency, making this more likely to happen.
               *
               * This is solved by only making requests for partitions with initialized offsets.
               *
               * See the following pull request which explains the context of the problem:
               * @issue https://github.com/tulios/kafkajs/pull/578
               */
              return committedOffsets[topicPartition.topic][partition] != null;
            })
            .map((partition: any) => ({
              partition,
              fetchOffset: this.offsetManager!
                .nextOffset(topicPartition.topic, partition)
                .toString(),
              maxBytes: maxBytesPerPartition,
            }));

          requestsPerNode[nodeId] = requestsPerNode[nodeId] || [];
          requestsPerNode[nodeId].push({
            topic: topicPartition.topic,
            partitions,
          });
        }
      }

      const requests = keys(requestsPerNode).map(async (nodeId: any) => {
        const broker = await this.cluster.findBroker({ nodeId });
        const { responses } = await broker.fetch({
          maxWaitTime,
          minBytes,
          maxBytes,
          isolationLevel: this.isolationLevel,
          topics: requestsPerNode[nodeId],
          rackId: this.rackId,
        });

        const batchesPerPartition = responses.map(
          ({ topicName, partitions }: any) => {
            const topicRequestData = requestsPerNode[nodeId].find(
              ({ topic }: any) => topic === topicName
            );
            let preferredReadReplicas =
              this.preferredReadReplicasPerTopicPartition[topicName];
            if (!preferredReadReplicas) {
              this.preferredReadReplicasPerTopicPartition[topicName] =
                preferredReadReplicas = {};
            }

            return partitions
              .filter(
                (partitionData: any) =>
                  !this.seekOffset.has(topicName, partitionData.partition) &&
                  !this.subscriptionState!.isPaused(
                    topicName,
                    partitionData.partition
                  )
              )
              .map((partitionData: any) => {
                const { partition, preferredReadReplica } = partitionData;
                if (
                  preferredReadReplica != null &&
                  preferredReadReplica !== -1
                ) {
                  const { nodeId: currentPreferredReadReplica } =
                    preferredReadReplicas[partition] || {};
                  if (currentPreferredReadReplica !== preferredReadReplica) {
                    this.logger.info(
                      `Preferred read replica is now ${preferredReadReplica}`,
                      {
                        groupId: this.groupId,
                        memberId: this.memberId,
                        topic: topicName,
                        partition,
                      }
                    );
                  }
                  preferredReadReplicas[partition] = {
                    nodeId: preferredReadReplica,
                    expireAt: Date.now() + this.metadataMaxAge,
                  };
                }

                const partitionRequestData = topicRequestData.partitions.find(
                  ({ partition }: any) => partition === partitionData.partition
                );

                const fetchedOffset = partitionRequestData.fetchOffset;
                const batch = new Batch(
                  topicName,
                  fetchedOffset,
                  partitionData
                );

                /**
                 * Resolve the offset to skip the control batch since `eachBatch` or `eachMessage` callbacks
                 * won't process empty batches
                 *
                 * @see https://github.com/apache/kafka/blob/9aa660786e46c1efbf5605a6a69136a1dac6edb9/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1499-L1505
                 */
                if (
                  batch.isEmptyControlRecord() ||
                  batch.isEmptyDueToLogCompactedMessages()
                ) {
                  this.resolveOffset({
                    topic: batch.topic,
                    partition: batch.partition,
                    offset: batch.lastOffset(),
                  });
                }

                return batch;
              });
          }
        );

        return flatten(batchesPerPartition);
      });

      // fetch can generate empty requests when the consumer group receives an assignment
      // with more topics than the subscribed, so to prevent a busy loop we wait the
      // configured max wait time
      if (requests.length === 0) {
        await sleep(this.maxWaitTime);
        return BufferedAsyncIterator([]);
      }

      return BufferedAsyncIterator(requests, (e: any) =>
        this.recoverFromFetch(e)
      );
    } catch (e) {
      await this.recoverFromFetch(e);
    }
  }

  async recoverFromFetch(e: any) {
    if (
      STALE_METADATA_ERRORS.includes(e.type) ||
      e.name === 'KafkaJSTopicMetadataNotLoaded'
    ) {
      this.logger.debug('Stale cluster metadata, refreshing...', {
        groupId: this.groupId,
        memberId: this.memberId,
        error: e.message,
      });

      await this.cluster.refreshMetadata();
      await this.joinAndSync();
      throw new KafkaJSError(e.message);
    }

    if (e.name === 'KafkaJSStaleTopicMetadataAssignment') {
      this.logger.warn(`${e.message}, resync group`, {
        groupId: this.groupId,
        memberId: this.memberId,
        topic: e.topic,
        unknownPartitions: e.unknownPartitions,
      });

      await this.joinAndSync();
    }

    if (e.name === 'KafkaJSOffsetOutOfRange') {
      await this.recoverFromOffsetOutOfRange(e);
    }

    if (e.name === 'KafkaJSConnectionClosedError') {
      this.cluster.removeBroker({ host: e.host, port: e.port });
    }

    if (
      e.name === 'KafkaJSBrokerNotFound' ||
      e.name === 'KafkaJSConnectionClosedError'
    ) {
      this.logger.debug(`${e.message}, refreshing metadata and retrying...`);
      await this.cluster.refreshMetadata();
    }

    throw e;
  }

  async recoverFromOffsetOutOfRange(e: any) {
    // If we are fetching from a follower try with the leader before resetting offsets
    const preferredReadReplicas =
      this.preferredReadReplicasPerTopicPartition[e.topic];
    if (
      preferredReadReplicas &&
      typeof preferredReadReplicas[e.partition] === 'number'
    ) {
      this.logger.info(
        'Offset out of range while fetching from follower, retrying with leader',
        {
          topic: e.topic,
          partition: e.partition,
          groupId: this.groupId,
          memberId: this.memberId,
        }
      );
      delete preferredReadReplicas[e.partition];
    } else {
      this.logger.error('Offset out of range, resetting to default offset', {
        topic: e.topic,
        partition: e.partition,
        groupId: this.groupId,
        memberId: this.memberId,
      });

      await this.offsetManager!.setDefaultOffset({
        topic: e.topic,
        partition: e.partition,
      });
    }
  }

  generatePartitionsPerSubscribedTopic() {
    const map = new Map();

    for (const topic of this.topicsSubscribed) {
      const partitions = this.cluster
        .findTopicPartitionMetadata(topic)
        .map((m: any) => m.partitionId)
        .sort();

      map.set(topic, partitions);
    }

    return map;
  }

  checkForStaleAssignment() {
    if (!this.partitionsPerSubscribedTopic) {
      return;
    }

    const newPartitionsPerSubscribedTopic =
      this.generatePartitionsPerSubscribedTopic();

    for (const [topic, partitions] of newPartitionsPerSubscribedTopic) {
      const diff = arrayDiff(
        partitions,
        this.partitionsPerSubscribedTopic.get(topic)
      );

      if (diff.length > 0) {
        throw new KafkaJSStaleTopicMetadataAssignment(
          'Topic has been updated',
          {
            topic,
            unknownPartitions: diff,
          }
        );
      }
    }
  }

  hasSeekOffset({ topic, partition }: any) {
    return this.seekOffset.has(topic, partition);
  }

  /**
   * For each of the partitions find the best nodeId to read it from
   *
   * @param {string} topic
   * @param {number[]} partitions
   * @returns {{[nodeId: number]: number[]}} per-node assignment of partitions
   * @see Cluster~findLeaderForPartitions
   */
  // Invariant: The resulting object has each partition referenced exactly once
  findReadReplicaForPartitions(topic: any, partitions: any) {
    const partitionMetadata = this.cluster.findTopicPartitionMetadata(topic);
    const preferredReadReplicas =
      this.preferredReadReplicasPerTopicPartition[topic];
    return partitions.reduce((result: any, id: any) => {
      const partitionId = parseInt(id, 10);
      const metadata = partitionMetadata.find(
        (p: any) => p.partitionId === partitionId
      );
      if (!metadata) {
        return result;
      }

      if (metadata.leader == null) {
        throw new KafkaJSError('Invalid partition metadata', {
          //topic,
          //partitionId,
          //// metadata,
        });
      }

      // Pick the preferred replica if there is one, and it isn't known to be offline, otherwise the leader.
      let nodeId = metadata.leader;
      if (preferredReadReplicas) {
        const { nodeId: preferredReadReplica, expireAt } =
          preferredReadReplicas[partitionId] || {};
        if (Date.now() >= expireAt) {
          this.logger.debug(
            'Preferred read replica information has expired, using leader',
            {
              topic,
              partitionId,
              groupId: this.groupId,
              memberId: this.memberId,
              preferredReadReplica,
              leader: metadata.leader,
            }
          );
          // Drop the entry
          delete preferredReadReplicas[partitionId];
        } else if (preferredReadReplica != null) {
          // Valid entry, check whether it is not offline
          // Note that we don't delete the preference here, and rather hope that eventually that replica comes online again
          const offlineReplicas = metadata.offlineReplicas;
          if (
            Array.isArray(offlineReplicas) &&
            offlineReplicas.includes(nodeId)
          ) {
            this.logger.debug(
              'Preferred read replica is offline, using leader',
              {
                topic,
                partitionId,
                groupId: this.groupId,
                memberId: this.memberId,
                preferredReadReplica,
                leader: metadata.leader,
              }
            );
          } else {
            nodeId = preferredReadReplica;
          }
        }
      }
      const current = result[nodeId] || [];
      return { ...result, [nodeId]: [...current, partitionId] };
    }, {});
  }
}
